-
Couldn't load subscription status.
- Fork 1.9k
[kernel-spark] Add rate limiting to getFileChanges() for DSv2 streaming #5361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Hi @huan233usc @gengliangwang @tdas @jerrypeng could you please review this PR? |
|
|
||
| /** | ||
| * Interface for files that can be admitted by admission control in Delta streaming sources. | ||
| * This abstraction allows both Scala and Java IndexedFile implementations to be used with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Scala and Java -> v1 and v2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| /** | ||
| * Returns the size of the file in bytes. | ||
| * For files without a file action or files with unknown size, returns 0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just make this api can only be called when file action exists?
V1's current impl will just throw NPE if hasFileAction() is false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| @Override | ||
| public long getFileSize() { | ||
| return addFile.getSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: null check?
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
This PR is adds rate limiting to getFileChange() which reads delta metadata to determines what data to process (offset management).
How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).
Does this PR introduce any user-facing changes?
No